[SPARK-57544][SQL] Rework column ID validation for nested fields in DSv2#56619
[SPARK-57544][SQL] Rework column ID validation for nested fields in DSv2#56619aokolnychyi wants to merge 4 commits into
Conversation
|
This is related to #55376. |
|
@gengliangwang @juliuszsompolski @andreaschat-db @longvu-db, what do you folks think? |
|
@huaxingao, this is the PR I was talking about on the dev list. |
|
This is not yet complete as it doesn't handle stripping column IDs on CTAS/RTAS. |
0b3f26f to
30864e4
Compare
| .build(); | ||
| } | ||
|
|
||
| /** |
There was a problem hiding this comment.
Switching newly added create methods to a builder. Hasn't been released yet.
| * @param name the name of the column | ||
| * @param dataType the data type of the column | ||
| * @return a new builder | ||
| * @since 4.2.0 |
There was a problem hiding this comment.
Intentional to be cherry-picked into 4.2.
| return this; | ||
| } | ||
|
|
||
| public Builder metadataInJSON(String metadataInJSON) { |
There was a problem hiding this comment.
Probably call it metadata.
There was a problem hiding this comment.
Renamed the method name, intentionally kept the variable name.
gengliangwang
left a comment
There was a problem hiding this comment.
2 blocking, 2 non-blocking, 1 nit.
Clean recursive generalization of the column-ID check, but two blockers: leftover references to the removed v1 machinery will fail tests, and the field-ID error message names the wrong (or no) field.
Design / architecture (2)
- Incomplete removal of the column-ID-v1 machinery:
DSv2IncrementallyConstructedQueryTests(run by the modifiedDataSourceV2DataFrameSuiteand byDataSourceV2DataFrameConnectSuite) still asserts the removedINCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMN_ID_MISMATCHat lines 324/414/488 — the path now throwsCOLUMNS_MISMATCH, so thosecheckErrors fail.ComposedColumnIdTableCatalog(now unreferenced anywhere; it is the workaround this PR eliminates) andNullTableIdAndNullColumnIdInMemoryTableCatalogstill cite the removedvalidateColumnIdsin comments. Migrate the trait's assertions toCOLUMNS_MISMATCHand deleteComposedColumnIdTableCatalog. (likely overlaps the known "not yet complete" work) CatalogV2Util.scala:676:FIELD_IDmay ride into user-facingdf.schemametadata — see inline (question).
Correctness (2)
SchemaUtils.scala:453: field-ID error names the parent (or empty) path, not the field whose ID changed — masked by tests asserting the wrong output — see inline.SchemaUtils.scala:544:removeFieldIdsstruct branch always reallocates, contradicting its own Scaladoc — see inline.
Nits: 1 minor item (see inline comments).
PR description suggestions
- Document: the new mechanism (field IDs stored per-field in
StructFieldmetadata and validated withinvalidateSchemaCompatibility), not only the old approach's limitation — the "What changes" section is currently one line. - Document: the removal of the dedicated
validateColumnIdspass and theCOLUMN_ID_MISMATCHerror subclass; field-ID changes now report asCOLUMNS_MISMATCH.
| case Some(otherField) => | ||
| if (checkFieldIds) { | ||
| for (id <- field.id; otherId <- otherField.id if id != otherId) { | ||
| errors += s"${colPath.fullyQuoted} field ID has changed from $id to $otherId" |
There was a problem hiding this comment.
This reports the parent struct's path, not the field whose ID changed. The check runs one level above the recursion that appends field.name, so unlike the sibling errors (formatField for removed/added, and the type/nullability messages emitted inside the recursion) it omits the field name: a top-level column change becomes " field ID has changed from 1 to 99" (empty path), and a nested change names the container — two changed sibling fields then produce indistinguishable messages.
| errors += s"${colPath.fullyQuoted} field ID has changed from $id to $otherId" | |
| errors += s"${(colPath :+ field.name).fullyQuoted} field ID has changed from $id to $otherId" |
The six V2TableUtilSuite expectations (lines 569/587/602/621-622/639/656) assert the current output and will need updating; the explanatory comment at line 586 also carries a non-ASCII em-dash — switch it to ASCII while you're there.
| */ | ||
| def removeFieldIds(dataType: DataType): DataType = dataType match { | ||
| case s: StructType => | ||
| StructType(s.fields.map { field => |
There was a problem hiding this comment.
This branch always rebuilds via StructType(s.fields.map(...)), so it never returns the original s — but the Scaladoc promises "the original instance unchanged when there is nothing to strip," which only the ArrayType/MapType branches honor. InMemoryBaseTable.assignFieldIds (line 945) shows the struct short-circuit: track whether any field changed and return s if none did. Either apply that here or relax the comment.
There was a problem hiding this comment.
I relaxed the comment.
| f = encodeDefaultValue(default, f) | ||
| } | ||
| Option(col.id()).foreach { id => | ||
| f = f.withId(id) |
There was a problem hiding this comment.
Encoding the top-level ID here also rides into the user-facing schema: DataSourceV2Relation.create builds its output from table.columns.asSchema (which calls this) via toAttributes, and FIELD_ID isn't in INTERNAL_METADATA_KEYS, so it looks like it ends up in df.schema field metadata for every column / nested field of an ID-tracking connector. Is that intended? If not, adding FIELD_ID to INTERNAL_METADATA_KEYS (or stripping it at the relation boundary) would keep it internal. I traced the encode path but didn't confirm end-user visibility — could you check?
| * IDs are preserved across type changes, keeping the same column ID through type | ||
| * widening and nested field additions. [[TypeChangeResetsColIdTableCatalog]] overrides | ||
| * this behavior for testing scenarios where type changes should produce a new ID. | ||
| * Existing IDs are preserved: Column → StructType → Column round-trip encodes them in |
There was a problem hiding this comment.
Non-ASCII arrows (U+2192); use ASCII (CLAUDE.md / scalastyle):
| * Existing IDs are preserved: Column → StructType → Column round-trip encodes them in | |
| * Existing IDs are preserved: Column -> StructType -> Column round-trip encodes them in |
| "<errors>" | ||
| ] | ||
| }, | ||
| "COLUMN_ID_MISMATCH" : { |
There was a problem hiding this comment.
Shall we have in-memory-table testings like in #55376 also where we test connector have implemented various scenarios?
There was a problem hiding this comment.
What cases do you mean?
There was a problem hiding this comment.
Like we have ComposedColumnIdTableCatalog, sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala,.... where we mock various connector scenarios, e.g. Connector that provide top-level column ID only, no struct column ID; Connector that provides both top-level col ID and struct col ID,....
There was a problem hiding this comment.
We replaced the approach in ComposedColumnIdTableCatalog, so I am not sure it is valid to keep. The rest of the tests should be in place.
| // semantics of SPARK-54444. | ||
| val dataErrors = V2TableUtil.validateCapturedColumns( |
There was a problem hiding this comment.
We don't need to change validateCaputredColumns here validateLoadedTableInTransaction? This is part of the txn validation https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala#L156
There was a problem hiding this comment.
I think we are calling validateCapturedColumns with checkFieldIds: Boolean = true.
Is this enough?
There was a problem hiding this comment.
@aokolnychyi In that case, IMO it's clearer if we avoid setting a default value for checkFieldIds, so we force all current & future call-sites to be clear with the semantics
There was a problem hiding this comment.
Yes and no, in my view. I removed some default but it makes sense to keep others as we should be validating IDs in most cases unless it is a temp view.
There was a problem hiding this comment.
Removed a few more places for explicit usage.
|
|
||
| object FieldMetadataUtils { | ||
| // Metadata key for the field ID used to track column identity across schema evolution | ||
| val FIELD_ID_METADATA_KEY = "__FIELD_ID" |
There was a problem hiding this comment.
Went for __FIELD_ID instead of FIELD_ID to indicate a Spark internal key.
| AUTO_GENERATED_ALIAS, | ||
| METADATA_COL_ATTR_KEY, | ||
| QUALIFIED_ACCESS_ONLY, | ||
| FIELD_ID_METADATA_KEY, |
There was a problem hiding this comment.
Added here too. It is OK if this field surfaces in spark.table("t").schema but it should not appear in SHOW and has to be dropped in CTAS / RTAS kind of scenarios. Adding here ensures V1 tables creates from DSv2 tables don't inherit these extra columns.
There was a problem hiding this comment.
Actually, let me think more about surfacing these keys in spark.table("t").schema.
There was a problem hiding this comment.
It is OK, that's what we do for default values, for example.
| ref.info.columns, | ||
| mode = ALLOW_NEW_TOP_LEVEL_FIELDS) | ||
| mode = ALLOW_NEW_TOP_LEVEL_FIELDS, | ||
| checkFieldIds = false) |
There was a problem hiding this comment.
This seems reasonable for temp views.
gengliangwang
left a comment
There was a problem hiding this comment.
5 addressed, 0 remaining, 3 new (1 newly introduced, 2 late catches — my misses from earlier rounds).
2 blocking, 0 non-blocking, 1 nit. The rework is clean and all four prior findings plus the nit are resolved — but the "Address feedback" commit's Column.Builder rename left five callers stale, so the branch no longer compiles.
Correctness (2)
- The branch doesn't compile:
Column.Builder.metadataInJSON(...)was renamed tometadata(...), but five callers still use the old name —CatalogV2Util.scala:776/785/794/802andColumnSuite.scala:111. CI is red across all build modules. See inline. TxnTabledropssetVersion(delegate.version())that master sets deliberately; with version-aware table equality this can break transaction re-resolution identity in tests — see inline (late catch).
Nits: 1 minor item — validateCapturedColumns / validateSchemaCompatibility Scaladoc omit the new field-ID check (see inline comments).
| .nullable(f.nullable) | ||
| .comment(comment) | ||
| .defaultValue(defaultValue) | ||
| .metadataInJSON(metadataAsJson(cleanedMetadata)) |
There was a problem hiding this comment.
This doesn't compile. The builder method was renamed metadataInJSON(...) → metadata(...) in this commit (Column.java:270 now exposes only metadata(...)), but this call still uses the old name — as do 785, 794, 802 in this file and ColumnSuite.scala:111. CI is red across every build module (incl. api, catalyst and Precompile Spark).
| .metadataInJSON(metadataAsJson(cleanedMetadata)) | |
| .metadata(metadataAsJson(cleanedMetadata)) |
Apply the same rename on lines 785, 794, 802, and ColumnSuite.scala:111.
| // Column IDs for existing columns are preserved through the StructType round-trip via | ||
| // metadata encoding. assignMissingIds assigns fresh IDs to any new columns added by | ||
| // schema evolution. | ||
| updateColumns(InMemoryBaseTable.assignMissingIds(columns())) |
There was a problem hiding this comment.
master sets setVersion(delegate.version()) right here ("the starting version should be the delegate version"), but this rewrite dropped it along with the old version comment. It looks unrelated to the column-ID change and reads as an accidental collateral deletion.
It matters: InMemoryBaseTable equality/hashCode are version-aware (id() == other.id() && version() == other.version()) and tableVersion defaults to 0, so the wrapper now reports version 0 instead of the delegate's — re-introducing the same identity false-positive the id override just above was added to prevent, now via version. Restore the line unless dropping it was intentional (in which case, what makes the version sync unnecessary now?).
| originCols: Seq[Column], | ||
| mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = { | ||
| mode: SchemaValidationMode = PROHIBIT_CHANGES, | ||
| checkFieldIds: Boolean = true): Seq[String] = { |
There was a problem hiding this comment.
The doc for this method lists type/nullability changes, removed, and added columns, but not the field-ID check it now performs — which is the headline behavior of this PR. Worth a line here and on SchemaUtils.validateSchemaCompatibility.
|
Updated and rebased. Ready for another round. Looked more into leaking, fixed CTAS / RTAS use cases. |
longvu-db
left a comment
There was a problem hiding this comment.
Thank you very much for working on this Anton!
| "<errors>" | ||
| ] | ||
| }, | ||
| "COLUMN_ID_MISMATCH" : { |
There was a problem hiding this comment.
Like we have ComposedColumnIdTableCatalog, sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/TypeChangeResetsColIdTableCatalog.scala,.... where we mock various connector scenarios, e.g. Connector that provide top-level column ID only, no struct column ID; Connector that provides both top-level col ID and struct col ID,....
| /** | ||
| * Returns a copy of this field with the field ID removed, or this field if no ID is set. | ||
| */ | ||
| def clearId(): StructField = { |
There was a problem hiding this comment.
-
Something tells me we should either return a copy in both scenarios, or in-place modification in both scenarios, for consistency.
-
P/s: if someone calls
field.clearId(), it feels more intuitive as in-place modification for me, but I'm okay with this reutrning a copy -
Also if someone does
a.fieldCopy = field.clearId() (where no ID is set) b.modification_to_the_fieldCopy`
Since currently, we return the field itself, would modification_to_the_fieldCopy modifies both the fieldCopy and field, sort of like indirect modification?
There was a problem hiding this comment.
StructField is immutable by design and can't be modified.
| "INTERNAL_ERROR", | ||
| Map.of("message", | ||
| "Column '" + name + "' cannot have more than one definition of: " + | ||
| "default value, generation expression, identity column spec")); |
There was a problem hiding this comment.
I assume in reality, this error shouldn't happen, so we cannot test it right?
There was a problem hiding this comment.
Shouldn't happen in practice. We do test this in ColumnSuite, though. Just to be safe.
| } | ||
|
|
||
| public Builder clearFieldIds() { | ||
| this.id = null; |
There was a problem hiding this comment.
Is it a problem if the clearId above returns a copy, but clearFieldIds here doesn't?
There was a problem hiding this comment.
This is in the builder where we mutate the temp state before creating Column.
|
|
||
| // Column IDs assigned by the catalog must NOT appear in SHOW CREATE TABLE output. | ||
| val showDDL = getShowCreateDDL(t) | ||
| assert(!showDDL.exists(_.contains("__FIELD_ID")), s"command must not expose column IDs") |
There was a problem hiding this comment.
| assert(!showDDL.exists(_.contains("__FIELD_ID")), s"command must not expose column IDs") | |
| assert(!showDDL.exists(_.contains(FIELD_ID_METADATA_KEY)), s"command must not expose column IDs") |
| // validation gets a chance to compare data types. | ||
| // Connect re-resolves both sides with the new column ID. | ||
| // The delete removes the old column and the add assigns a fresh one, | ||
| // so COLUMNS_MISMATCH fires in classic before schema validation gets a |
There was a problem hiding this comment.
schema validation is part of the COLUMNS_MISMATCH check, so I think it can be confusing to say COLUMNS_MISMATCH fires before schema validation, I think we should instead say "so column IDs check fires"
| test("ReplaceTableAsSelect: field IDs in query schema are not propagated to table columns") { | ||
| val basicCatalog = catalog("testcat").asTableCatalog | ||
| val atomicCatalog = catalog("testcat_atomic").asTableCatalog | ||
| val basicIdentifier = "testcat.table_name" |
There was a problem hiding this comment.
Do we need to have withTable to clean up the identifier?
There was a problem hiding this comment.
We do call withTable at line 674 or am I missing something?
| classOf[ComposedColumnIdTableCatalog].getName) | ||
| .set("spark.sql.catalog.composedidcat.copyOnLoad", "true") | ||
| .set(SQLConf.TIME_TYPE_ENABLED.key, "true") | ||
| .set(InMemoryBaseTable.ASSIGN_COLUMN_IDS, "true") |
There was a problem hiding this comment.
I assume we need this flag ASSIGN_COLUMN_IDS because there are scenarios where we want to test table ID or schema name/data type validation?
There was a problem hiding this comment.
Yeah, the idea to have this flag is to minimize the number of tests that have to be modified to account for column IDs that didn't exist before.
|
|
||
| test("composed nested IDs tolerate nested field reorder end-to-end") { | ||
| val t = "composedidcat.ns1.ns2.tbl" | ||
| test("nested field reorder does not trigger column ID mismatch") { |
There was a problem hiding this comment.
// InMemoryTable does not actually reorder nested struct fields in stored
// data, so the read still returns the original field order. This is fine
// because the purpose of this test is to verify that the column ID check
Based on the comment, InMemoryTable does not actually do the re-ordering, it's a limitation
=> Let's try to fix InMemoryTable so that it actual does the re-ordering?
val nameParts = colPath :+ field.name
if (checkFieldIds) {
for (id <- field.id; otherId <- otherField.id if id != otherId) {
errors += s"${nameParts.fullyQuoted} field ID has changed from $id to $otherId"
}
}
Cuz the schema validation validates nested columns based on name, not position in the struct
=> I'm fine if this name matching mechanism is already tested in some way.
There was a problem hiding this comment.
Removed this test and replaced with a more appropriate one. It will be tricky to implement reordering in the in-memory catalog.
longvu-db
left a comment
There was a problem hiding this comment.
LGTM apart from some comments
Thank you very much for working on this Anton!
| originCols: Seq[Column], | ||
| mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = { | ||
| mode: SchemaValidationMode = PROHIBIT_CHANGES, | ||
| checkIds: Boolean = true): Seq[String] = { |
There was a problem hiding this comment.
| checkIds: Boolean = true): Seq[String] = { | |
| checkIds: Boolean): Seq[String] = { |
| originMetaCols: Seq[MetadataColumn], | ||
| mode: SchemaValidationMode = PROHIBIT_CHANGES): Seq[String] = { | ||
| mode: SchemaValidationMode = PROHIBIT_CHANGES, | ||
| checkIds: Boolean = true): Seq[String] = { |
There was a problem hiding this comment.
| checkIds: Boolean = true): Seq[String] = { | |
| checkIds: Boolean): Seq[String] = { |
| /** | ||
| * Recursively clears field IDs from a data type. | ||
| */ | ||
| def clearFieldIds(dataType: DataType): DataType = dataType match { |
There was a problem hiding this comment.
Nit: I wonder if we should call array elem, map key/value a "field" also
| val rawSchema = CharVarcharUtils.getRawSchema(removeInternalMetadata(schema), conf) | ||
| val tableSchema = if (forceNullable) rawSchema.asNullable else rawSchema | ||
| CatalogV2Util.structTypeToV2Columns(tableSchema) | ||
| CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds = false) |
There was a problem hiding this comment.
| CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds = false) | |
| CatalogV2Util.structTypeToV2Columns(tableSchema, keepIds) |
There was a problem hiding this comment.
This won't compile, will it?
| "tableName" -> ".*", | ||
| "errors" -> | ||
| """| | ||
| |- `salary` field ID has changed from \d+ to \d+ |
There was a problem hiding this comment.
"salary field ID has changed from \d+ to \d+"
Does something look wrong here? i.e. the field ID didn't seem to change, and also it's a peculiar field ID name
There was a problem hiding this comment.
I think it did change per comment above "reset-id catalog assigns a new ID for the widened column".
| import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, GenerationExpression, Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog} | ||
| import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog | ||
| import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} | ||
| import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, DefaultValue, GenerationExpression, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdAndNullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog} |
There was a problem hiding this comment.
Have we had a test where the Connector implements only the top-level Column ID but not the nested struct field ID? I assume sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullTableIdInMemoryTableCatalog.scala?
There was a problem hiding this comment.
Not sure, maybe not, may need a follow-up PR.
Co-authored-by: Thang Long Vu <107926660+longvu-db@users.noreply.github.com>
|
Thanks, merging to master/4.x |
### What changes were proposed in this pull request? This PR reworks column ID validation for nested fields in DSv2. ### Why are the changes needed? The original implementation detected dropped-and-re-added columns by comparing top-level Column.id() strings in a dedicated validateColumnIds pass, but this approach had no visibility into nested struct fields, array elements, or map keys/values. To work around this limitation, connectors had to encode nested field IDs into the top-level ID string (as demonstrated by ComposedColumnIdTableCatalog), placing an unreasonable burden on connector authors and making the feature fragile by design. The new mechanism stores field IDs in `StructField` metadata and validates within `validateSchemaCompatibility`. ### Does this PR introduce _any_ user-facing change? Yes but it targets unreleased functionality and must be cherry picked to 4.2. ### How was this patch tested? Existing and new tests. ### Was this patch authored or co-authored using generative AI tooling? Claude Code v2.1.183 Closes #56619 from aokolnychyi/spark-57544. Authored-by: Anton Okolnychyi <aokolnychyi@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit 64197c9) Signed-off-by: Gengliang Wang <gengliang@apache.org>
|
Let me look into creating a PR against 4.2. Thanks @gengliangwang @longvu-db! |
What changes were proposed in this pull request?
This PR reworks column ID validation for nested fields in DSv2.
Why are the changes needed?
The original implementation detected dropped-and-re-added columns by comparing top-level Column.id() strings in a dedicated validateColumnIds pass, but this approach had no visibility into nested struct fields, array elements, or map keys/values. To work around this limitation, connectors had to encode nested field IDs into the top-level ID string (as demonstrated by ComposedColumnIdTableCatalog), placing an unreasonable burden on connector authors and making the feature fragile by design.
The new mechanism stores field IDs in
StructFieldmetadata and validates withinvalidateSchemaCompatibility.Does this PR introduce any user-facing change?
Yes but it targets unreleased functionality and must be cherry picked to 4.2.
How was this patch tested?
Existing and new tests.
Was this patch authored or co-authored using generative AI tooling?
Claude Code v2.1.183